/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.swak.stroage;

import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.sun.jna.NativeLong;
import com.sun.jna.Pointer;

import sun.nio.ch.DirectBuffer;

/**
 * 一组基于文件的存储管理
 * 
 * @author DELL
 */
@SuppressWarnings("restriction")
public class MappedFile extends ReferenceResource {
	public static final int OS_PAGE_SIZE = 1024 * 4;
	protected static final Logger log = LoggerFactory.getLogger(MappedFile.class);

	private static final AtomicLong TOTAL_MAPPED_VIRTUAL_MEMORY = new AtomicLong(0);

	private static final AtomicInteger TOTAL_MAPPED_FILES = new AtomicInteger(0);
	protected final AtomicInteger wrotePosition = new AtomicInteger(0);
	protected final AtomicInteger committedPosition = new AtomicInteger(0);
	private final AtomicInteger flushedPosition = new AtomicInteger(0);
	protected int fileSize;
	protected FileChannel fileChannel;
	/**
	 * Message will put to here first, and then reput to FileChannel if writeBuffer
	 * is not null.
	 */
	protected ByteBuffer writeBuffer = null;
	protected TransientStorePool transientStorePool = null;
	private String fileName;
	private long fileFromOffset;
	private File file;
	private MappedByteBuffer mappedByteBuffer;
	private volatile long storeTimestamp = 0;
	private boolean firstCreateInQueue = false;

	public MappedFile() {
	}

	public MappedFile(final String fileName, final int fileSize) throws IOException {
		init(fileName, fileSize);
	}

	public MappedFile(final String fileName, final int fileSize, final TransientStorePool transientStorePool)
			throws IOException {
		init(fileName, fileSize, transientStorePool);
	}

	public static void ensureDirOK(final String dirName) {
		if (dirName != null) {
			File f = new File(dirName);
			if (!f.exists()) {
				boolean result = f.mkdirs();
				log.info(dirName + " mkdir " + (result ? "OK" : "Failed"));
			}
		}
	}

	public static void clean(final ByteBuffer buffer) {
		if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
			return;
		invoke(invoke(viewed(buffer), "cleaner"), "clean");
	}

	private static Object invoke(final Object target, final String methodName, final Class<?>... args) {
		return AccessController.doPrivileged(new PrivilegedAction<Object>() {
			public Object run() {
				try {
					Method method = method(target, methodName, args);
					method.setAccessible(true);
					return method.invoke(target);
				} catch (Exception e) {
					throw new IllegalStateException(e);
				}
			}
		});
	}

	private static Method method(Object target, String methodName, Class<?>[] args) throws NoSuchMethodException {
		try {
			return target.getClass().getMethod(methodName, args);
		} catch (NoSuchMethodException e) {
			return target.getClass().getDeclaredMethod(methodName, args);
		}
	}

	private static ByteBuffer viewed(ByteBuffer buffer) {
		String methodName = "viewedBuffer";
		Method[] methods = buffer.getClass().getMethods();
		for (int i = 0; i < methods.length; i++) {
			if (methods[i].getName().equals("attachment")) {
				methodName = "attachment";
				break;
			}
		}

		ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName);
		if (viewedBuffer == null)
			return buffer;
		else
			return viewed(viewedBuffer);
	}

	public static int getTotalMappedFiles() {
		return TOTAL_MAPPED_FILES.get();
	}

	public static long getTotalMappedVirtualMemory() {
		return TOTAL_MAPPED_VIRTUAL_MEMORY.get();
	}

	public void init(final String fileName, final int fileSize, final TransientStorePool transientStorePool)
			throws IOException {
		init(fileName, fileSize);
		this.writeBuffer = transientStorePool.borrowBuffer();
		this.transientStorePool = transientStorePool;
	}

	@SuppressWarnings("resource")
	private void init(final String fileName, final int fileSize) throws IOException {
		this.fileName = fileName;
		this.fileSize = fileSize;
		this.file = new File(fileName);
		this.fileFromOffset = Long.parseLong(this.file.getName());
		boolean ok = false;

		ensureDirOK(this.file.getParent());

		try {
			this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
			this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
			TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
			TOTAL_MAPPED_FILES.incrementAndGet();
			ok = true;
		} catch (FileNotFoundException e) {
			log.error("Failed to create file " + this.fileName, e);
			throw e;
		} catch (IOException e) {
			log.error("Failed to map file " + this.fileName, e);
			throw e;
		} finally {
			if (!ok && this.fileChannel != null) {
				this.fileChannel.close();
			}
		}
	}

	public long getLastModifiedTimestamp() {
		return this.file.lastModified();
	}

	public int getFileSize() {
		return fileSize;
	}

	public FileChannel getFileChannel() {
		return fileChannel;
	}

	public long getFileFromOffset() {
		return this.fileFromOffset;
	}

	public boolean appendMessage(final byte[] data) {
		int currentPos = this.wrotePosition.get();

		if ((currentPos + data.length) <= this.fileSize) {
			try {
				this.fileChannel.position(currentPos);
				this.fileChannel.write(ByteBuffer.wrap(data));
			} catch (Throwable e) {
				log.error("Error occurred when append message to mappedFile.", e);
			}
			this.wrotePosition.addAndGet(data.length);
			return true;
		}

		return false;
	}

	/**
	 * Content of data from offset to offset + length will be wrote to file.
	 *
	 * @param offset The offset of the subarray to be used.
	 * @param length The length of the subarray to be used.
	 */
	public boolean appendMessage(final byte[] data, final int offset, final int length) {
		int currentPos = this.wrotePosition.get();

		if ((currentPos + length) <= this.fileSize) {
			try {
				this.fileChannel.position(currentPos);
				this.fileChannel.write(ByteBuffer.wrap(data, offset, length));
			} catch (Throwable e) {
				log.error("Error occurred when append message to mappedFile.", e);
			}
			this.wrotePosition.addAndGet(length);
			return true;
		}

		return false;
	}

	/**
	 * @return The current flushed position
	 */
	public int flush(final int flushLeastPages) {
		if (this.isAbleToFlush(flushLeastPages)) {
			if (this.hold()) {
				int value = getReadPosition();

				try {
					// We only append data to fileChannel or mappedByteBuffer, never both.
					if (writeBuffer != null || this.fileChannel.position() != 0) {
						this.fileChannel.force(false);
					} else {
						this.mappedByteBuffer.force();
					}
				} catch (Throwable e) {
					log.error("Error occurred when force data to disk.", e);
				}

				this.flushedPosition.set(value);
				this.release();
			} else {
				log.warn("in flush, hold failed, flush offset = " + this.flushedPosition.get());
				this.flushedPosition.set(getReadPosition());
			}
		}
		return this.getFlushedPosition();
	}

	public int commit(final int commitLeastPages) {
		if (writeBuffer == null) {
			// no need to commit data to file channel, so just regard wrotePosition as
			// committedPosition.
			return this.wrotePosition.get();
		}
		if (this.isAbleToCommit(commitLeastPages)) {
			if (this.hold()) {
				commit0(commitLeastPages);
				this.release();
			} else {
				log.warn("in commit, hold failed, commit offset = " + this.committedPosition.get());
			}
		}

		// All dirty data has been committed to FileChannel.
		if (writeBuffer != null && this.transientStorePool != null && this.fileSize == this.committedPosition.get()) {
			this.transientStorePool.returnBuffer(writeBuffer);
			this.writeBuffer = null;
		}

		return this.committedPosition.get();
	}

	protected void commit0(final int commitLeastPages) {
		int writePos = this.wrotePosition.get();
		int lastCommittedPosition = this.committedPosition.get();

		if (writePos - lastCommittedPosition > commitLeastPages) {
			try {
				ByteBuffer byteBuffer = writeBuffer.slice();
				byteBuffer.position(lastCommittedPosition);
				byteBuffer.limit(writePos);
				this.fileChannel.position(lastCommittedPosition);
				this.fileChannel.write(byteBuffer);
				this.committedPosition.set(writePos);
			} catch (Throwable e) {
				log.error("Error occurred when commit data to FileChannel.", e);
			}
		}
	}

	private boolean isAbleToFlush(final int flushLeastPages) {
		int flush = this.flushedPosition.get();
		int write = getReadPosition();

		if (this.isFull()) {
			return true;
		}

		if (flushLeastPages > 0) {
			return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= flushLeastPages;
		}

		return write > flush;
	}

	protected boolean isAbleToCommit(final int commitLeastPages) {
		int flush = this.committedPosition.get();
		int write = this.wrotePosition.get();

		if (this.isFull()) {
			return true;
		}

		if (commitLeastPages > 0) {
			return ((write / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE)) >= commitLeastPages;
		}

		return write > flush;
	}

	public int getFlushedPosition() {
		return flushedPosition.get();
	}

	public void setFlushedPosition(int pos) {
		this.flushedPosition.set(pos);
	}

	public boolean isFull() {
		return this.fileSize == this.wrotePosition.get();
	}

	public SelectMappedBufferResult selectMappedBuffer(int pos, int size) {
		int readPosition = getReadPosition();
		if ((pos + size) <= readPosition) {
			if (this.hold()) {
				ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
				byteBuffer.position(pos);
				ByteBuffer byteBufferNew = byteBuffer.slice();
				byteBufferNew.limit(size);
				return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
			} else {
				log.warn("matched, but hold failed, request pos: " + pos + ", fileFromOffset: " + this.fileFromOffset);
			}
		} else {
			log.warn("selectMappedBuffer request pos invalid, request pos: " + pos + ", size: " + size
					+ ", fileFromOffset: " + this.fileFromOffset);
		}

		return null;
	}

	public SelectMappedBufferResult selectMappedBuffer(int pos) {
		int readPosition = getReadPosition();
		if (pos < readPosition && pos >= 0) {
			if (this.hold()) {
				ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
				byteBuffer.position(pos);
				int size = readPosition - pos;
				ByteBuffer byteBufferNew = byteBuffer.slice();
				byteBufferNew.limit(size);
				return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this);
			}
		}

		return null;
	}

	@Override
	public boolean cleanup(final long currentRef) {
		if (this.isAvailable()) {
			log.error("this file[REF:" + currentRef + "] " + this.fileName + " have not shutdown, stop unmapping.");
			return false;
		}

		if (this.isCleanupOver()) {
			log.error("this file[REF:" + currentRef + "] " + this.fileName + " have cleanup, do not do it again.");
			return true;
		}

		clean(this.mappedByteBuffer);
		TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
		TOTAL_MAPPED_FILES.decrementAndGet();
		log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
		return true;
	}

	public boolean destroy(final long intervalForcibly) {
		this.shutdown(intervalForcibly);

		if (this.isCleanupOver()) {
			try {
				this.fileChannel.close();
				log.info("close file channel " + this.fileName + " OK");

				long beginTime = System.currentTimeMillis();
				boolean result = this.file.delete();
				log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
						+ (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
						+ this.getFlushedPosition() + ", " + (System.currentTimeMillis() - beginTime));
			} catch (Exception e) {
				log.warn("close file channel " + this.fileName + " Failed. ", e);
			}

			return true;
		} else {
			log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + " Failed. cleanupOver: "
					+ this.cleanupOver);
		}

		return false;
	}

	public int getWrotePosition() {
		return wrotePosition.get();
	}

	public void setWrotePosition(int pos) {
		this.wrotePosition.set(pos);
	}

	/**
	 * @return The max position which have valid data
	 */
	public int getReadPosition() {
		return this.writeBuffer == null ? this.wrotePosition.get() : this.committedPosition.get();
	}

	public void setCommittedPosition(int pos) {
		this.committedPosition.set(pos);
	}

	public void warmMappedFile(FlushDiskType type, int pages) {
		long beginTime = System.currentTimeMillis();
		ByteBuffer byteBuffer = this.mappedByteBuffer.slice();
		int flush = 0;
		long time = System.currentTimeMillis();
		for (int i = 0, j = 0; i < this.fileSize; i += MappedFile.OS_PAGE_SIZE, j++) {
			byteBuffer.put(i, (byte) 0);
			// force flush when flush disk type is sync
			if (type == FlushDiskType.SYNC_FLUSH) {
				if ((i / OS_PAGE_SIZE) - (flush / OS_PAGE_SIZE) >= pages) {
					flush = i;
					mappedByteBuffer.force();
				}
			}

			// prevent gc
			if (j % 1000 == 0) {
				log.info("j={}, costTime={}", j, System.currentTimeMillis() - time);
				time = System.currentTimeMillis();
				try {
					Thread.sleep(0);
				} catch (InterruptedException e) {
					log.error("Interrupted", e);
				}
			}
		}

		// force flush when prepare load finished
		if (type == FlushDiskType.SYNC_FLUSH) {
			log.info("mapped file warm-up done, force to disk, mappedFile={}, costTime={}", this.getFileName(),
					System.currentTimeMillis() - beginTime);
			mappedByteBuffer.force();
		}
		log.info("mapped file warm-up done. mappedFile={}, costTime={}", this.getFileName(),
				System.currentTimeMillis() - beginTime);

		this.mlock();
	}

	public String getFileName() {
		return fileName;
	}

	public MappedByteBuffer getMappedByteBuffer() {
		return mappedByteBuffer;
	}

	public ByteBuffer sliceByteBuffer() {
		return this.mappedByteBuffer.slice();
	}

	public long getStoreTimestamp() {
		return storeTimestamp;
	}

	public boolean isFirstCreateInQueue() {
		return firstCreateInQueue;
	}

	public void setFirstCreateInQueue(boolean firstCreateInQueue) {
		this.firstCreateInQueue = firstCreateInQueue;
	}

	public void mlock() {
		final long beginTime = System.currentTimeMillis();
		final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
		Pointer pointer = new Pointer(address);
		{
			int ret = LibC.INSTANCE.mlock(pointer, new NativeLong(this.fileSize));
			log.info("mlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret,
					System.currentTimeMillis() - beginTime);
		}

		{
			int ret = LibC.INSTANCE.madvise(pointer, new NativeLong(this.fileSize), LibC.MADV_WILLNEED);
			log.info("madvise {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret,
					System.currentTimeMillis() - beginTime);
		}
	}

	public void munlock() {
		final long beginTime = System.currentTimeMillis();
		final long address = ((DirectBuffer) (this.mappedByteBuffer)).address();
		Pointer pointer = new Pointer(address);
		int ret = LibC.INSTANCE.munlock(pointer, new NativeLong(this.fileSize));
		log.info("munlock {} {} {} ret = {} time consuming = {}", address, this.fileName, this.fileSize, ret,
				System.currentTimeMillis() - beginTime);
	}

	// testable
	File getFile() {
		return this.file;
	}

	@Override
	public String toString() {
		return this.fileName;
	}
}
